Skip to content

KAFKA-20115: Group coordinator fails to unload metadata when no longer leader or follower#21396

Open
brandboat wants to merge 4 commits intoapache:4.2from
brandboat:KAFKA-20115
Open

KAFKA-20115: Group coordinator fails to unload metadata when no longer leader or follower#21396
brandboat wants to merge 4 commits intoapache:4.2from
brandboat:KAFKA-20115

Conversation

@brandboat
Copy link
Member

@brandboat brandboat commented Feb 3, 2026

When a broker loses leadership of a __consumer_offsets partition while a
write batch is pending, the coordinator unload process fails because
freeCurrentBatch() attempts to access partition writer configuration
which throws NOT_LEADER_OR_FOLLOWER exception.

This commit fixes the issue by skipping buffer release during unload
since all related resources are being closed anyway.

…r leader or follower

Signed-off-by: Kuan-Po Tseng <brandboat@gmail.com>
@dajac
Copy link
Member

dajac commented Feb 3, 2026

cc @clolov This is a potential blocker for 4.2.

@chia7712
Copy link
Member

chia7712 commented Feb 3, 2026

cc @clolov This is a potential blocker for 4.2.

I gave him the heads-up about the bad news offline. Hopefully, he's still my fried after this. 😢

Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the fix!
Some initial thoughts:

}

@Test
public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this together with the rest of the testScheduleUnloading* tests?

new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use DEFAULT_WRITE_TIMEOUT here unless there's a reason to use a different timeout?

Suggested change
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, thanks!

Signed-off-by: Kuan-Po Tseng <brandboat@gmail.com>
}

@Test
public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about adding integration tests. For example:

    @ClusterTest(
            brokers = 2,
            types = {Type.KRAFT},
            serverProperties = {
                @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
                @ClusterConfigProperty(key = "group.coordinator.append.linger.ms", value = "3000")
            }
    )
    public void test(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException, TimeoutException {
        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
            producer.send(new ProducerRecord<>("topic", "value".getBytes(StandardCharsets.UTF_8)));
        }

        try (var admin = clusterInstance.admin()) {
            admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
        }

        try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
             var admin = clusterInstance.admin()) {
            consumer.subscribe(List.of("topic"));
            while (consumer.poll(Duration.ofMillis(100)).isEmpty()) {
                // empty body
            }
            // append records to coordinator
            consumer.commitSync();

            // unload the coordinator by changing leader (0 -> 1)
            admin.alterPartitionReassignments(Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
                    Optional.of(new NewPartitionReassignment(List.of(1))))).all().get();
        }

        Function<GroupCoordinator, List<TopicPartition>> partitionsInGroupMetrics = service -> assertDoesNotThrow(() -> {
            var f0 = GroupCoordinatorService.class.getDeclaredField("groupCoordinatorMetrics");
            f0.setAccessible(true);
            var f1 = GroupCoordinatorMetrics.class.getDeclaredField("shards");
            f1.setAccessible(true);
            return List.copyOf(((Map<TopicPartition, ?>) f1.get(f0.get(service))).keySet());
        });

        // the offset partition should NOT be hosted by multiple coordinators
        var tps = clusterInstance.brokers().values().stream()
                .flatMap(b -> partitionsInGroupMetrics.apply(b.groupCoordinator()).stream()).toList();
        assertEquals(1, tps.size());
    }

WDYT?

Copy link
Member Author

@brandboat brandboat Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, why not? Thanks for the thorough integration test!

Signed-off-by: Kuan-Po Tseng <brandboat@gmail.com>
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
// There is no need to free the current batch, as we will be closing all related resources anyway.
failCurrentBatch(Errors.NOT_COORDINATOR.exception(), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a readable method? for example:

private void failCurrentBatchWithoutRelease(Throwable t) {
        failCurrentBatch(t, false);
    }

@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = "3000")
}
)
public void testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException, TimeoutException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat could you open a PR against trunk to improve the test coverage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you go: #21403

Signed-off-by: Kuan-Po Tseng <brandboat@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants